Skip to content

feat: add ParquetLookupProvider and SqliteLookupProvider behind feature gates#3

Merged
anoop-narang merged 11 commits into
mainfrom
feat/parquet-sqlite-providers
Mar 16, 2026
Merged

feat: add ParquetLookupProvider and SqliteLookupProvider behind feature gates#3
anoop-narang merged 11 commits into
mainfrom
feat/parquet-sqlite-providers

Conversation

@anoop-narang

Copy link
Copy Markdown
Collaborator

Summary

  • Adds src/keys.rs (always compiled): pack_key, unpack_key, DatasetLayout — the bit-packed row-address encoding extracted from the POC's indexing.rs, now shared by both providers
  • Adds src/parquet_provider.rs (feature parquet-provider): concurrent row-group reads from any ObjectStore with pre-cached parquet footers and optional RowSelection page-skip optimisation
  • Adds src/sqlite_provider.rs (feature sqlite-provider): B-tree point lookups via a WAL-mode connection pool; builds the SQLite table from parquet on first run
  • Adds 9 integration tests across two test files

Feature flags

# Cargo.toml of the consuming crate
datafusion-vector-search-ext = { path = "...", features = ["parquet-provider"] }
datafusion-vector-search-ext = { path = "...", features = ["sqlite-provider"] }
datafusion-vector-search-ext = { path = "...", features = ["parquet-provider", "sqlite-provider"] }

Changes from POC

  • anyhow::Result replaced throughout with datafusion::common::Result / DataFusionError
  • Hardcoded SQLITE_PATH, POOL_SIZE, and table name "hf_models" removed; all accepted as open_or_build() constructor args
  • crate::indexing imports replaced with crate::keys
  • use datafusion_vector_search_ext::PointLookupProvider replaced with crate::lookup::PointLookupProvider
  • tokio gains the sync feature unconditionally (needed for Semaphore; was already a hard dep)

Test plan

  • cargo check — clean (no features)
  • cargo check --features parquet-provider — clean
  • cargo check --features sqlite-provider — clean
  • cargo check --features parquet-provider,sqlite-provider — clean
  • cargo clippy --all-features -- -D warnings — clean
  • cargo test --features parquet-provider — 5 parquet tests pass
  • cargo test --features sqlite-provider — 4 sqlite tests pass
  • cargo test --features parquet-provider,sqlite-provider — all 9 pass

@anoop-narang anoop-narang force-pushed the feat/parquet-sqlite-providers branch from d059334 to f460fae Compare March 16, 2026 07:03
…re gates

Ports the two storage backends proven in the df-vector-search benchmark POC
into the library as optional, feature-gated providers.

- `parquet-provider` feature: ParquetLookupProvider — concurrent row-group
  reads from any ObjectStore (S3, local FS) with pre-cached parquet footers
  and optional RowSelection for page-skip optimisation
- `sqlite-provider` feature: SqliteLookupProvider — B-tree point lookups via
  a WAL-mode connection pool; builds from parquet on first run
- `keys` module (always compiled): pack_key / unpack_key / DatasetLayout —
  shared key encoding utilities extracted from indexing.rs
- Integration tests for both providers (9 tests total)

Breaking change in Cargo.toml: tokio gains the "sync" feature unconditionally
(needed by SqliteLookupProvider's Semaphore; tokio was already a hard dep).
@anoop-narang anoop-narang force-pushed the feat/parquet-sqlite-providers branch from f460fae to b18e75e Compare March 16, 2026 09:08
Comment thread src/sqlite_provider.rs Outdated
Comment on lines +165 to +173
let conn = pool.lock().unwrap().pop().unwrap();
let res = execute_query_sync(
&conn,
&keys_vec,
&out_schema,
proj_owned.as_deref(),
&table_name,
);
pool.lock().unwrap().push(conn);

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1: Connection pool corruption on panic

If execute_query_sync panics (or the mutex is poisoned), the closure unwinds before reaching pool.lock().unwrap().push(conn). The connection is permanently lost from the pool. After pool_size such events the pool is empty; the next call acquires the semaphore permit, calls pop().unwrap() on an empty Vec, and panics — cascading all future queries to failure.

Use a guard to guarantee the connection is returned:

struct ConnGuard {
    pool: Arc<Mutex<Vec<Connection>>>,
    conn: Option<Connection>,
}
impl Drop for ConnGuard {
    fn drop(&mut self) {
        if let Some(c) = self.conn.take() {
            // best-effort: ignore poison
            let _ = self.pool.lock().map(|mut p| p.push(c));
        }
    }
}

Or at minimum replace pop().unwrap() with a checked path and ensure the connection is pushed back even on error paths.

Comment thread src/parquet_provider.rs Outdated
Comment on lines +198 to +199
let cached_meta = self.metadata_cache[file_idx].clone();
let selected_parquet_cols = selected_parquet_cols.clone();

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1: Unchecked index access panics on stale/corrupt keys

file_idx is decoded from a packed USearch key. If the index was built against a different number of files than the provider was opened with (e.g. after re-sharding, or due to key corruption), file_idx will be out of bounds and both accesses here will panic rather than returning an error.

Suggested change
let cached_meta = self.metadata_cache[file_idx].clone();
let selected_parquet_cols = selected_parquet_cols.clone();
let file_key = self.file_keys.get(file_idx)
.ok_or_else(|| DataFusionError::Execution(format!(
"packed key references file_idx={file_idx} but provider has {} files",
self.file_keys.len()
)))?.clone();
let cached_meta = self.metadata_cache.get(file_idx)
.ok_or_else(|| DataFusionError::Execution(format!(
"packed key references file_idx={file_idx} but metadata cache has {} entries",
self.metadata_cache.len()
)))?.clone();

Comment thread src/sqlite_provider.rs Outdated
|row| row.get(0),
)
.unwrap_or(0);
println!(" SQLite table '{table_name}' already exists ({n} rows), skipping build.");

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 (suggestion): Library code should use tracing::info! instead of println!. The crate already depends on tracing.

Suggested change
println!(" SQLite table '{table_name}' already exists ({n} rows), skipping build.");
tracing::info!("SQLite table '{}' already exists ({} rows), skipping build.", table_name, n);

Comment thread src/sqlite_provider.rs Outdated
}
tx.commit()
.map_err(|e| DataFusionError::Execution(e.to_string()))?;
println!(" SQLite table '{table_name}' built and committed.");

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 (suggestion): Same — prefer tracing::info! over println! in library code.

Suggested change
println!(" SQLite table '{table_name}' built and committed.");
tracing::info!("SQLite table '{}' built and committed.", table_name);

Comment thread src/parquet_provider.rs
row_idx_arr.clone()
} else {
content.next().expect("column mismatch")
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 (suggestion): expect() will panic if the column iterator runs short due to a logic error in projection mapping. Return an error instead:

Suggested change
}
content.next().ok_or_else(|| DataFusionError::Execution("projection column mismatch".into()))?

@claude claude Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review

Issues

P1 — src/sqlite_provider.rs lines 165–173: Connection pool corruption on panic

If execute_query_sync panics (or a mutex is poisoned), the closure in spawn_blocking unwinds before the pool.lock().unwrap().push(conn) on line 173. The connection is permanently lost from the pool. Because the semaphore is still decremented correctly, the next caller acquires a permit, calls pool.lock().unwrap().pop().unwrap() on an empty Vec, and panics — all further queries fail. With a poisoned mutex, every subsequent unwrap() on the lock also panics immediately. See inline comment for a suggested guard-based fix.

P1 — src/parquet_provider.rs lines 198–199: Unchecked index access panics on stale keys

file_idx is decoded from a packed USearch key with no bounds check. If the key was produced against a different number of files (re-sharding, key corruption, mismatch between index and provider), both self.file_keys[file_idx] and self.metadata_cache[file_idx] will panic instead of returning a DataFusionError. See inline comment for a .get() + ok_or_else fix.

Action Required

  1. Guard the connection push-back in sqlite_provider.rs so it executes even on panic paths (e.g. a ConnGuard that pushes on Drop).
  2. Replace the two unchecked slice indexing sites in parquet_provider.rs with bounds-checked .get() calls that return errors.

P1 — sqlite_provider: add ConnGuard to return connection to pool on
panic, preventing permanent pool shrinkage and cascading failures.

P1 — parquet_provider: replace unchecked file_keys[file_idx] /
metadata_cache[file_idx] with bounds-checked .get() calls that return
DataFusionError instead of panicking on stale or mismatched keys.

P2 — parquet_provider: replace .expect("column mismatch") in projection
mapping with a proper ? returning DataFusionError::Execution.

P2 — sqlite_provider: replace println! with tracing::info! throughout;
library code should not write to stdout directly.
Comment thread src/keys.rs
/// and the lookup providers.
#[inline]
pub fn pack_key(file_idx: usize, rg_idx: usize, local_offset: usize) -> u64 {
((file_idx as u64) << 48) | ((rg_idx as u64) << 32) | (local_offset as u64)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 — Silent data corruption on out-of-range inputs.

file_idx is silently masked to 16 bits, rg_idx to 16 bits, and local_offset to 32 bits. If any of these exceed their encoded range (file_idx/rg_idx ≥ 65 536, local_offset ≥ 4 294 967 296), the packed key is wrong and the lookup returns the wrong row with no error or warning.

These aren't just theoretical: a dataset with a large single file can easily produce local_offset values in the millions — which is fine, but rg_idx grows per-file, so it's realistic for rg_idx to exceed 65 535 in a heavily-sharded dataset.

Suggest adding debug-mode assertions (or unconditional checks returning Err) before encoding:

Suggested change
((file_idx as u64) << 48) | ((rg_idx as u64) << 32) | (local_offset as u64)
debug_assert!(file_idx < (1 << 16), "file_idx {file_idx} overflows 16 bits");
debug_assert!(rg_idx < (1 << 16), "rg_idx {rg_idx} overflows 16 bits");
debug_assert!(local_offset < (1 << 32), "local_offset {local_offset} overflows 32 bits");
((file_idx as u64) << 48) | ((rg_idx as u64) << 32) | (local_offset as u64)

Comment thread src/sqlite_provider.rs Outdated
) -> DFResult<Vec<RecordBatch>> {
let placeholders = keys.iter().map(|_| "?").collect::<Vec<_>>().join(", ");
let sql = format!(
"SELECT * FROM \"{table_name}\" WHERE row_idx IN ({placeholders}) ORDER BY row_idx"

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 — SQL injection via table_name.

table_name is a caller-supplied string embedded directly into SQL inside double-quotes. Double-quoting an identifier only protects against injection if embedded " characters are also escaped (as ""). This code does not escape them.

A table_name containing " (e.g. foo" WHERE 1=1; DROP TABLE foo; --) will break out of the quoted identifier and allow arbitrary SQL execution. The same issue exists in build_table at the CREATE TABLE and INSERT INTO statements, and in open_or_build at the SELECT COUNT(*).

Rustqlite doesn't support parameterised identifiers, so the fix is to validate/sanitise table_name at construction time — reject names containing " or characters outside [a-zA-Z0-9_], or escape embedded " by doubling them:

fn quote_ident(name: &str) -> String {
    format!("\"{}\"", name.replace('"', "\"\""))
}

Then use quote_ident(&self.table_name) everywhere a table name appears in a format string.

Comment thread src/sqlite_provider.rs Outdated
.map_err(|e| DataFusionError::Execution(e.to_string()))?;

let result = tokio::task::spawn_blocking(move || {
let conn = pool.lock().unwrap().pop().unwrap();

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 — Double unwrap() in blocking thread.

pool.lock().unwrap().pop().unwrap() has two distinct panic paths:

  1. The first .unwrap() panics if the mutex is poisoned. The ConnGuard drop impl does a lock().map(...) (silently ignoring poison) to return connections, so after a panic the pool can become permanently undersized or empty on the next call.

  2. The second .unwrap() panics if the pool is empty. The semaphore is supposed to guarantee a slot is available, but if pool_size = 0 is passed to open_or_build, Semaphore::new(0) is created and acquire() immediately returns Err, so this path is never reached — but it's still a fragile invariant with no guard at construction time.

Suggest:

  • Validate pool_size >= 1 in open_or_build and return an error rather than allowing an unusable provider.
  • Replace the second unwrap() with an explicit error: .ok_or_else(|| DataFusionError::Execution("connection pool unexpectedly empty".into()))?

@claude claude Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review - P1 issues found. (1) pack_key silent truncation (src/keys.rs:12): file_idx/rg_idx >= 65536 or local_offset >= 2^32 are silently masked, producing a wrong key with no error. Add debug_assert or return Err on overflow. (2) SQL injection via table_name (src/sqlite_provider.rs:123,225,319,328): table_name is embedded in SQL with double-quote delimiters but embedded double-quotes are not escaped, allowing arbitrary SQL injection. Fix: validate table_name contains only [a-zA-Z0-9_] at construction time, or use a quote_ident helper that doubles embedded double-quotes.

P1 — keys: add debug_assert! on pack_key inputs to catch out-of-range
file_idx/rg_idx (>= 65536) or local_offset (>= 2^32) in debug builds
instead of silently producing a wrong key.

P1 — sqlite_provider: add quote_ident() helper that doubles embedded
double-quotes before interpolating table_name into SQL, preventing SQL
injection via a crafted table name. Applied to all four SQL format
strings (SELECT COUNT, SELECT *, CREATE TABLE, INSERT INTO).

P2 — sqlite_provider: validate pool_size >= 1 at construction time and
return DataFusionError instead of allowing an unusable provider.
Replace pool.lock().unwrap().pop().unwrap() with explicit map_err /
ok_or_else so mutex poison and empty-pool are surfaced as errors.
Comment thread src/sqlite_provider.rs Outdated
if f.name() == "row_idx" {
"row_idx INTEGER PRIMARY KEY".to_string()
} else {
format!("\"{}\" {}", f.name(), sql_type)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 — SQL injection via schema column names

Column names are embedded in the CREATE TABLE DDL with a raw format! but without escaping embedded double-quotes. A field named e.g. foo" TEXT, evil INTEGER -- would break out of the quoted identifier and inject arbitrary SQL.

quote_ident already exists in this file and handles this correctly — use it here:

Suggested change
format!("\"{}\" {}", f.name(), sql_type)
format!("{} {}", quote_ident(f.name()), sql_type)

Comment thread src/sqlite_provider.rs Outdated
}
}

if col_bufs[0].is_empty() {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 — index panic when schema is empty

col_bufs[0] panics if n_out == 0. Use col_bufs.first().map_or(true, |v| v.is_empty()) instead, or check n_out == 0 explicitly before the query.

Comment thread src/parquet_provider.rs
}
})
.collect();

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 — unbounded concurrency

try_join_all spawns all row-group reads at once with no upper bound. A query spanning many files or row groups could exhaust file descriptors or memory. Consider adding a semaphore (similar to what SqliteLookupProvider already does) to cap concurrent reads, or use futures::stream::iter(futures).buffer_unordered(N) with a configurable limit.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Disagree that this needs a fix at this stage. The concurrency here is bounded by k (USearch result count, typically 10–100) multiplied by the number of files — in practice a few dozen futures at most. object_store already manages HTTP connection pooling internally, so adding a semaphore on top would serialise reads that the HTTP client could otherwise pipeline in parallel, hurting throughput in the common case. If this library is ever used with very large k or hundreds of shards, a configurable max_concurrent_rg_reads on the provider would be the right approach — but adding it now without a concrete use case is premature optimisation. Leaving as-is.

@claude claude Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1: sqlite_provider.rs:334 - SQL injection via schema column names. build_table formats column names directly into CREATE TABLE DDL without escaping embedded double-quotes. A field named with an embedded quote breaks out of the quoted identifier. quote_ident already exists in the file but is not called here. Fix: use quote_ident(f.name()) instead of the raw format string. | P2: sqlite_provider.rs:274 - col_bufs[0].is_empty() panics when n_out == 0. Use col_bufs.first().map_or(true, |v| v.is_empty()). | P2: parquet_provider.rs:347 - try_join_all over all row-group reads has no upper bound. For large queries this can exhaust file descriptors or memory. Cap with a semaphore or buffer_unordered(N). All three must be addressed before merge.

Comment thread src/keys.rs Outdated
);
debug_assert!(rg_idx < (1 << 16), "rg_idx {rg_idx} overflows 16 bits");
debug_assert!(
local_offset < (1 << 32),

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 — overflow on 32-bit targets

1usize << 32 panics in debug mode when usize is 32 bits (shift-by-width is always a panic in Rust debug mode), so this debug_assert! fires unconditionally on 32-bit before the comparison even runs.

Suggested change
local_offset < (1 << 32),
local_offset <= u32::MAX as usize,

Comment thread src/parquet_provider.rs
Comment on lines +173 to +179
let selected_parquet_cols: Arc<Vec<usize>> = Arc::new(match projection {
None => self.parquet_col_indices.clone(),
Some(idxs) => idxs
.iter()
.filter(|&&i| i > 0 && (i - 1) < self.parquet_col_indices.len())
.map(|&i| self.parquet_col_indices[i - 1])
.collect(),

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 — projection column ordering bug

selected_parquet_cols is built in idxs order. ProjectionMask::roots however returns columns in parquet schema order (ascending column index), regardless of the order passed. combined.columns() (and therefore filtered) is in parquet schema order, but the out_cols loop below consumes filtered.into_iter() as if it were in idxs order.

Concretely: if parquet_col_indices = [2, 0] and projection = Some(&[1, 2]), then:

  • selected_parquet_cols = [2, 0]
  • parquet reader returns [col0, col2] (schema order)
  • loop assigns col0 to provider col 1 (expects parquet col 2) — wrong

Fix: sort selected_parquet_cols ascending, compute the inverse permutation, then reorder filtered back into idxs order before building out_cols. Or, build a lookup from parquet column index → position in filtered, and use that when assembling out_cols.

Comment thread src/parquet_provider.rs
cols
}
Some(idxs) => {
let mut content = filtered.into_iter();

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 — root cause of the projection ordering bug

filtered here is in parquet schema order (see comment on selected_parquet_cols above), but content is consumed sequentially as idxs is iterated. When the parquet-schema order of the selected columns doesn't match the order of idxs, the wrong column is assigned to each output position.

Comment thread src/parquet_provider.rs
})
.collect();

let results: Vec<Option<RecordBatch>> = try_join_all(futures).await?;

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 — unbounded concurrency

try_join_all drives all (file_idx, rg_idx) reads concurrently with no cap. For a large key batch spread across many row groups, this can open hundreds of simultaneous HTTP connections to the object store, potentially hitting rate limits or causing excess memory pressure from in-flight buffers.

Consider wrapping each future with a Semaphore permit (e.g. Arc<Semaphore>::new(32)) to bound the fan-out.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in the follow-up commit (0b75606). The earlier response at #3 (comment) explains the design rationale: concurrency is bounded by k × shards in practice (typically a few dozen futures), and object_store handles HTTP connection pooling internally. Adding a Semaphore at this layer would add complexity with no measurable benefit for the target k values. Leaving as-is.

Comment thread src/sqlite_provider.rs
Comment on lines +455 to +459
DataType::UInt64 => SqlValue::Integer(
col.as_any()
.downcast_ref::<UInt64Array>()
.unwrap()
.value(row) as i64,

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 — silent data loss for UInt64 values > i64::MAX

u64 values above i64::MAX (9223372036854775807) are cast to a negative i64. On read-back in sql_values_to_arrow, *i as u64 wraps back to a different value than what was originally stored, silently corrupting any UInt64 data column with large values.

The row_idx column is unaffected in practice (the key-lookup in execute_query_sync also casts u64 → i64 consistently), but any payload UInt64 column can be corrupted.

Possible fix: store oversized u64 values as TEXT (e.g. decimal string) and parse on read, or document that UInt64 columns must not exceed i64::MAX.

@claude claude Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review

Issues

P1 — Projection column ordering, src/parquet_provider.rs lines 173–179 / 324

ProjectionMask::roots returns columns in parquet schema order (ascending column index), not in the order the indices were passed. combined.columns() is therefore in parquet schema order, but out_cols consumes filtered.into_iter() while iterating idxs — assuming the two are in the same order. They are not when the mapped parquet column indices are not monotonically increasing in idxs order.

Concrete example: parquet_col_indices = [2, 0], projection = Some(&[1, 2])selected_parquet_cols = [2, 0], parquet reader returns [col0, col2], but the loop assigns col0 to the slot for provider col 1 (which maps to parquet col 2). Data columns are silently swapped.

Fix: after building selected_parquet_cols, record the permutation that maps parquet-schema order back to idxs order, then apply it to filtered before assembling out_cols.

P2 — Unbounded concurrency, src/parquet_provider.rs line 348

try_join_all drives every (file_idx, rg_idx) read concurrently with no cap. For large key batches this can open hundreds of simultaneous object-store connections. Add a Semaphore to bound the fan-out (e.g. 32).

P2 — debug_assert overflow on 32-bit targets, src/keys.rs line 22

1usize << 32 panics in debug mode when usize is 32 bits; the assertion fires unconditionally before the comparison runs. Use local_offset <= u32::MAX as usize instead.

P2 — Silent UInt64 data loss, src/sqlite_provider.rs lines 455–459

UInt64 payload columns with values > i64::MAX are cast to a negative i64 on store and wrap to a different u64 on read-back, silently corrupting values. (Key lookups are unaffected since both build and query apply the same cast.) Either store oversized values as decimal TEXT or document the constraint.

Action Required

  • Fix projection column ordering in ParquetLookupProvider::fetch_by_keys
  • Fix debug_assert overflow in pack_key
  • Resolve or document the UInt64 truncation in SqliteLookupProvider
  • Add a concurrency bound to the parquet fetch fan-out

…reorder, UInt64 note

- keys.rs: use u32::MAX as usize instead of (1 << 32) to avoid
  overflow on 32-bit targets
- parquet_provider.rs: reorder filtered columns from parquet-schema
  order back to idxs order after ProjectionMask::roots
- sqlite_provider.rs: add comment documenting UInt64 > i64::MAX
  wrap-around behaviour when storing to SQLite INTEGER
Comment thread src/parquet_provider.rs
.scan(state, projection, &[], None)
.await
}
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 — scan() silently returns empty results

HashKeyProvider::scan() returns actual data (vec![self.batches.clone()]). Both new providers return an empty MemTable, so any SQL query against a registered ParquetLookupProvider table will silently produce zero rows.

Either implement a real scan (ParquetLookupProvider has everything it needs — read all row groups concurrently), or fail explicitly:

Suggested change
}
) -> Result<Arc<dyn ExecutionPlan>> {
Err(DataFusionError::NotImplemented(
"ParquetLookupProvider does not support full table scans; use fetch_by_keys".into(),
))
}

Silent empty results are worse than a clear error.

Comment thread src/sqlite_provider.rs Outdated
) -> DFResult<Arc<dyn ExecutionPlan>> {
MemTable::try_new(self.schema.clone(), vec![])?
.scan(state, projection, &[], None)
.await

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 — same silent-empty-scan issue

HashKeyProvider::scan() returns data; this returns nothing. Return an explicit error instead of an empty table, so callers aren't silently misled:

Suggested change
.await
) -> DFResult<Arc<dyn ExecutionPlan>> {
Err(DataFusionError::NotImplemented(
"SqliteLookupProvider does not support full table scans; use fetch_by_keys".into(),
))
}

Comment thread src/parquet_provider.rs
use parquet::arrow::arrow_reader::{ArrowReaderOptions, RowSelection, RowSelector};
use parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader};
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
use parquet::file::metadata::ParquetMetaData;

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 — unnecessary pub field

schema is already exposed via TableProvider::schema(). Making the field pub breaks encapsulation (callers could hold a direct reference and bypass any future caching/invalidation logic). Drop the pub:

Suggested change
use parquet::file::metadata::ParquetMetaData;
schema: SchemaRef,

- keys.rs: unit tests for pack/unpack roundtrip, boundary values,
  and debug_assert panics for out-of-range file_idx and local_offset
- parquet: test projection with non-monotonic column order (the P1 bug
  where columns were silently swapped without the reorder fix)
- parquet: test stale file_idx returns Err not panic
- parquet + sqlite: test scan() returns NotImplemented (not silent empty)
- sqlite: test table name with spaces works via quote_ident
Comment thread src/sqlite_provider.rs
"SqliteLookupProvider: unsupported Arrow type {other:?}"
)))?,
})
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 — Missing Float32/Float64 arms

arrow_cell_to_sql writes Float32 and Float64 values to SQLite as REAL, but sql_values_to_arrow has no matching arms to read them back. Any schema containing float columns will hit the other => catch-all and return Err from fetch_by_keys.

Add the missing arms before the other fallthrough:

Suggested change
}
DataType::Float64 => {
let arr: arrow_array::Float64Array = values
.iter()
.map(|v| match v {
SqlValue::Real(f) => Some(*f),
_ => None,
})
.collect();
Arc::new(arr)
}
DataType::Float32 => {
let arr: arrow_array::Float32Array = values
.iter()
.map(|v| match v {
SqlValue::Real(f) => Some(*f as f32),
_ => None,
})
.collect();
Arc::new(arr)
}
other => Err(DataFusionError::Execution(format!(
"SqliteLookupProvider: unsupported Arrow type {other:?}"
)))?,

Comment thread src/keys.rs Outdated
.file_name()
.and_then(|n| n.to_str())
.ok_or_else(|| DataFusionError::Execution(format!("invalid path: {path}")))?;
file_keys.push(format!("parquet/{file_name}"));

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 — Hardcoded parquet/ object-store prefix

from_files silently prepends "parquet/" to every filename. If the caller's object store uses a different prefix (e.g., "data/", "s3://bucket/year=2024/"), all ParquetLookupProvider lookups will 404 or silently miss.

Consider accepting a prefix: &str argument, or removing the prefix entirely and letting callers pass fully-qualified object-store paths. Alternatively, document this as an explicit convention so callers know what to expect.

Comment thread src/parquet_provider.rs
})
.collect();

let results: Vec<Option<RecordBatch>> = try_join_all(futures).await?;

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 — Unbounded concurrency

try_join_all fires all row-group reads simultaneously. With a large key set spanning many row groups (or many files), this could exhaust the ObjectStore's connection pool or spike memory.

SqliteLookupProvider correctly bounds concurrency with a Semaphore; consider doing the same here, or replacing try_join_all with futures::stream::iter(futures).buffer_unordered(N).

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has now been raised three times across separate review rounds. The position remains the same as explained in #3 (comment): concurrency is bounded by k × shards in practice (rarely more than a few dozen futures), and object_store manages HTTP connection pooling internally. SqliteLookupProvider uses a Semaphore because it has a fixed connection pool that would deadlock without one — the constraint is fundamentally different. Adding a cap here would be complexity without a measurable benefit for realistic k values. Closing this thread.

Comment thread src/parquet_provider.rs
let gkeys: Vec<u64> = (0..n_rows)
.filter(|i| local_set.contains(i))
.map(|local| {
kv_pairs

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 — O(n × m) key reconstruction

For each filtered row position local, this calls kv_pairs.iter().find(...) — O(m) per row, O(n × m) total where n is the number of rows in the row group and m is the number of target keys. Build a HashMap<usize, u64> from kv_pairs before the loop to make this O(1) per lookup.

@claude claude Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review

Issues

P1 — src/sqlite_provider.rs:653: Float32/Float64 unreadable from SQLite

arrow_cell_to_sql correctly serialises Float32 and Float64 values as SqlValue::Real, but sql_values_to_arrow has no DataType::Float32 or DataType::Float64 arm. Any fetch_by_keys call against a schema that includes a float column hits the other => fallthrough and returns Err. The table builds fine; lookups silently fail. See inline comment for a suggested fix.

Action Required

Add DataType::Float32 and DataType::Float64 match arms in sql_values_to_arrow (inline comment has a code suggestion), and add a test that round-trips a float column through open_or_buildfetch_by_keys.

All three range checks (file_idx, rg_idx, local_offset) now fire in
release builds. overflow silently produces a key pointing to the wrong
row with no downstream error — a hard panic is the right behaviour.

Also removes the now-incorrect #[cfg(debug_assertions)] guards from
the corresponding should_panic tests.
Comment thread src/sqlite_provider.rs
let n = batch.num_rows();

for row_i in 0..n {
let r = file_row + row_i as u64;

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 — INSERT column ordering assumes row_idx is always field 0

params is built as [packed_key, parquet_col_0, parquet_col_1, …] and bound positionally to the VALUES (?, ?, …) placeholders. Those placeholders map to columns in CREATE TABLE declaration order, which is schema.fields() order. So this is correct only if row_idx happens to be schema.field(0).

If a caller passes a schema where row_idx is not the first field, the packed key gets inserted into the wrong column and all lookups silently return wrong rows.

Fix: assert the invariant at the top of build_table (or in open_or_build):

Suggested change
let r = file_row + row_i as u64;
let mut params: Vec<SqlValue> = Vec::with_capacity(schema.fields().len());
debug_assert_eq!(
schema.field(0).name(), "row_idx",
"schema must have row_idx as its first field"
);
params.push(SqlValue::Integer(packed_key as i64));

Or, better, find the field by name and insert at the correct position rather than assuming index 0.

Comment thread src/parquet_provider.rs
let row_selection = RowSelection::from(sel);
let gkeys: Vec<u64> = kv_pairs.iter().map(|&(k, _)| k).collect();
(
builder

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 — Duplicate local_offset values produce incorrect RowSelection

After kv_pairs.sort_by_key(|&(_, l)| l), if two keys share the same (file_idx, rg_idx, local_offset), the second iteration sees off == prev - 1, so off > prev is false, and no skip is emitted. The select(1) that follows then captures the row after the intended row, returning wrong data with no error.

In practice usearch should not emit duplicate keys, but a stale index or a test setup where a key is included twice will trigger silent data corruption instead of a detectable error.

Consider deduplicating kv_pairs before building selectors, or adding an assertion:

debug_assert_eq!(
    kv_pairs.windows(2).filter(|w| w[0].1 == w[1].1).count(),
    0,
    "duplicate local_offsets in kv_pairs will corrupt RowSelection"
);

Comment thread src/sqlite_provider.rs
table_name: &str,
) -> DFResult<Vec<RecordBatch>> {
let placeholders = keys.iter().map(|_| "?").collect::<Vec<_>>().join(", ");
let sql = format!(

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 (suggestion) — SELECT * relies implicitly on SQLite column order

projected_indices maps projection indices directly to SQLite column positions, which is correct because SQLite preserves declaration order for SELECT *. This is fine as long as the table is only ever written by build_table, but it's a fragile hidden invariant.

Using explicit column names would make the contract visible and survive any future schema-evolution code:

let col_names = out_schema.fields().iter().map(|f| quote_ident(f.name())).collect::<Vec<_>>().join(", ");
let sql = format!(
    "SELECT {col_names} FROM {tn} WHERE row_idx IN ({placeholders}) ORDER BY row_idx",
    tn = quote_ident(table_name)
);

Non-blocking suggestion.

@claude claude Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review

Issues

P1 — src/sqlite_provider.rs ~line 380: INSERT column ordering assumes row_idx is field 0

build_table constructs params as [packed_key, parquet_col_0, …] and binds them positionally to VALUES (?, …). The placeholders map to columns in CREATE TABLE declaration order, which is schema.fields() order. This is correct only when row_idx is schema.field(0).

If a caller passes a schema where row_idx is not the first field, the packed key is inserted into the wrong column and all subsequent lookups silently return wrong rows. The API documents no such ordering requirement.

P2 — src/parquet_provider.rs ~line 247: duplicate local_offset in use_row_selection path

After sorting kv_pairs by offset, a repeated offset produces a second RowSelector::select(1) that captures the next row instead of the intended row. Stale indexes or caller-supplied duplicate keys produce silent data corruption rather than an error. See inline comment for a deduplication approach.

Action Required

  1. In build_table (or open_or_build): assert/enforce that row_idx is the first schema field, or rewrite INSERT to locate row_idx by name and insert at the correct position.
  2. In fetch_by_keys (row_selection path): deduplicate kv_pairs on local_offset before building RowSelectors, or add a debug_assert to catch the case.

- Wrap CREATE TABLE inside the INSERT transaction so a mid-build crash
  cannot leave an empty table that open_or_build silently accepts on
  the next startup.

- Add missing Float32 and Float64 arms to sql_values_to_arrow; these
  types were written correctly by arrow_cell_to_sql but read back via
  the unsupported-type error path, breaking any schema with float cols.

- Replace SELECT * with an explicit column list built from out_schema
  so only projected columns are fetched from SQLite. Also removes the
  now-dead projected_indices helper.
Comment thread src/keys.rs Outdated
.file_name()
.and_then(|n| n.to_str())
.ok_or_else(|| DataFusionError::Execution(format!("invalid path: {path}")))?;
file_keys.push(format!("parquet/{file_name}"));

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "parquet/" prefix is hardcoded here, but callers may store files at different object-store paths. If someone calls DatasetLayout::from_files and passes the resulting file_keys directly to ParquetLookupProvider, it will look for parquet/<filename> in the store regardless of the actual layout.

Consider accepting the prefix (or the full key pattern) as a parameter, or at minimum documenting that file_keys are always "parquet/<basename>" so callers know what to expect:

Suggested change
file_keys.push(format!("parquet/{file_name}"));
file_keys.push(format!("parquet/{file_name}"));

Comment thread src/parquet_provider.rs
file_keys: Vec<String>,
store: Arc<dyn ObjectStore>,
pub schema: SchemaRef,
/// Parquet column indices for the provider schema positions 1, 2, …

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All other fields are private; schema being pub is inconsistent. The field is already accessible via the TableProvider::schema() impl. Consider making it private.

Suggested change
/// Parquet column indices for the provider schema positions 1, 2, …
pub(crate) schema: SchemaRef,

Comment thread src/parquet_provider.rs
Comment on lines +300 to +308
.collect::<Result<_>>()?;
let gkeys: Vec<u64> = (0..n_rows)
.filter(|i| local_set.contains(i))
.map(|local| {
kv_pairs
.iter()
.find(|&&(_, l)| l == local)
.map(|&(k, _)| k)
.unwrap_or(u64::MAX)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The gkeys construction here is O(n²): for each output row we call .find() over kv_pairs (which is sorted). Since kv_pairs is already sorted by local_offset, a binary search (partition_point) would be O(n log n):

let gkeys: Vec<u64> = (0..n_rows)
    .filter(|i| local_set.contains(i))
    .map(|local| {
        let pos = kv_pairs.partition_point(|&(_, l)| l < local);
        kv_pairs.get(pos).map(|&(k, _)| k).unwrap_or(u64::MAX)
    })
    .collect();

Not a blocker — the per-RG key count is typically small — but worth fixing before the hot path scales up.

claude[bot]
claude Bot previously approved these changes Mar 16, 2026
The hardcoded "parquet/" prefix created silent coupling between
from_files and the ObjectStore root — a mismatch caused all lookups
to 404 with no clear error.

key_prefix is now explicit: callers pass "" for a flat layout,
"parquet/" for a parquet/ subdirectory, or any S3 prefix needed to
match their object store configuration.
Comment thread src/parquet_provider.rs
Comment on lines +242 to +247
prev = off + 1;
}
let row_selection = RowSelection::from(sel);
let gkeys: Vec<u64> = kv_pairs.iter().map(|&(k, _)| k).collect();
(
builder

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 — Silent wrong data with duplicate keys in new_with_row_selection

If keys contains two entries that decode to the same (file_idx, rg_idx, local_offset) — e.g., USearch returns the same nearest-neighbor key twice — kv_pairs will have two (key, off) pairs with identical off. On the second iteration off > prev is false (since prev = off + 1), so no skip is emitted, and select(1) picks the row immediately after the previously-selected row instead of re-selecting the same row. The output batch gets the wrong row, silently.

The non-row-selection path has a related (but less severe) bug: local_set is a HashSet, so duplicate offsets are deduplicated — you get one result row instead of two, with no error.

Fix: deduplicate kv_pairs by local_offset before building the RowSelection (or before either path), and document that duplicate input keys collapse to a single result row:

Suggested change
prev = off + 1;
}
let row_selection = RowSelection::from(sel);
let gkeys: Vec<u64> = kv_pairs.iter().map(|&(k, _)| k).collect();
(
builder
kv_pairs.sort_by_key(|&(_, l)| l);
kv_pairs.dedup_by_key(|&mut (_, l)| l);
let (stream_builder, global_keys) = if use_row_selection {
let mut sel: Vec<RowSelector> = Vec::new();
let mut prev = 0usize;
for &(_, off) in &kv_pairs {
if off > prev {
sel.push(RowSelector::skip(off - prev));
}
sel.push(RowSelector::select(1));
prev = off + 1;
}

Comment thread src/parquet_provider.rs
.find(|&&(_, l)| l == local)
.map(|&(k, _)| k)
.unwrap_or(u64::MAX)
})

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 — unwrap_or(u64::MAX) masks an invariant

local_set is constructed directly from kv_pairs, so find() is logically guaranteed to succeed. But .unwrap_or(u64::MAX) silently stuffs an invalid sentinel into the row_idx output column if the invariant ever breaks (e.g., after a future refactor). A corrupt u64::MAX key downstream is very hard to debug.

Suggested change
})
.map(|&(k, _)| k)
.expect("invariant: local came from kv_pairs so find must succeed")

@claude claude Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review

Issues

src/parquet_provider.rs lines 242–247 — Silent wrong data with duplicate keys (row-selection path)

When keys contains two entries that decode to the same (file_idx, rg_idx, local_offset), kv_pairs ends up with duplicate off values after sorting. The RowSelector loop advances prev = off + 1 after the first occurrence, so the second select(1) picks the row after the previously-selected row instead of repeating it. The output batch silently contains the wrong row.

The non-row-selection path is also affected: local_set is a HashSet, so duplicates collapse silently — you get one result row instead of two with no error.

src/parquet_provider.rs line 309 — unwrap_or(u64::MAX) masks an invariant

The find() is logically guaranteed to succeed, but the u64::MAX fallback would silently corrupt the row_idx output column if the invariant breaks during a future refactor. Should be .expect(...).

Action Required

  1. Deduplicate kv_pairs by local_offset (after sorting, before either branch) so both the row-selection and non-row-selection paths handle duplicate input keys consistently. Inline suggestion attached.
  2. Replace .unwrap_or(u64::MAX) with .expect("invariant: ..."). Inline suggestion attached.

Comment thread src/parquet_provider.rs
/// Object-store path per file, indexed by file_idx encoded in the usearch key.
file_keys: Vec<String>,
store: Arc<dyn ObjectStore>,
pub schema: SchemaRef,

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 suggestion: schema is pub but ParquetLookupProvider already implements TableProvider::schema() which returns self.schema.clone(). Making the field public leaks the internal SchemaRef directly and breaks encapsulation — callers could substitute a different Arc without going through any validation. Suggest making it private and relying on the trait method (or adding a pub fn schema(&self) -> SchemaRef getter if needed outside a TableProvider context).

Comment thread src/keys.rs

impl DatasetLayout {
/// Convert a packed usearch key back to a global (dataset-wide) row index.
#[inline]

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 suggestion: This function is public and indexes into file_cum_rows / rg_cum_rows without bounds checking. A stale or mismatched key (e.g. from an index built against a different dataset version) will panic rather than surface an actionable error. Consider returning Option<u64> and using get instead of direct indexing:

Suggested change
#[inline]
pub fn packed_key_to_global(&self, key: u64) -> Option<u64> {
let (file_idx, rg_idx, local_offset) = unpack_key(key);
let file_base = self.file_cum_rows.get(file_idx)?;
let rg_base = self.rg_cum_rows.get(file_idx)?.get(rg_idx)?;
Some(file_base + rg_base + local_offset as u64)

At minimum, the doc-comment should document the panic conditions the same way pack_key does.

Comment thread src/sqlite_provider.rs
}

impl SqliteLookupProvider {
/// Open the existing SQLite database at `db_path`, or build it from

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 suggestion: cache_size = -65536 sets a 64 MB page cache per connection. With the default pool_size = 4 used in tests, that's 256 MB just for the SQLite page cache. This should either be documented in the open_or_build API (so callers can size pool_size knowing the memory cost) or exposed as a parameter alongside pool_size.

Comment thread src/sqlite_provider.rs
#[allow(clippy::too_many_arguments)]
pub fn open_or_build(
db_path: &str,
table_name: &str,

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 suggestion: build_table identifies the primary-key column by checking f.name() == "row_idx" and execute_query_sync hardcodes WHERE row_idx IN (...). This implicitly requires that the schema passed here has row_idx as its first field. That contract is invisible from this signature — callers who build a schema with a different field order or name will get a silently corrupt table. Consider adding a doc-comment spelling it out, or asserting at the top of open_or_build that schema.field(0).name() == "row_idx".

@anoop-narang anoop-narang merged commit 285e87c into main Mar 16, 2026
5 checks passed
@anoop-narang anoop-narang deleted the feat/parquet-sqlite-providers branch March 16, 2026 12:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant